Solutions/Trend Micro Vision One/Data Connectors/AzureFunctionTrendMicroXDR/shared_code/transform_utils.py (293 lines of code) (raw):
import json
from typing import Any, List, Optional
import orjson
import os
from shared_code.customized_logger.customized_json_logger import (
get_customized_json_logger,
)
from shared_code.models.oat import (
ALLOWED_OAT_FIELDS,
ALLOWED_OAT_DETAIL_FIELDS,
OAT_OVERSIZED_FIELDS,
)
from shared_code.models.rca import RCA_RESULT_OVERSIZED_FIELDS
from shared_code.models.workbench import (
ALLOWED_WORKBENCH_COLUMN,
ALLOWED_WORKBENCH_DETAIL_COLUMN,
XDR_INDICATORS_COLUMN_NAME,
WB_OVERSIZED_FIELDS,
)
logger = get_customized_json_logger()
FIELD_BYTES_LIMIT = 32750
def _load_file(file_name):
pwd = os.path.dirname(__file__)
with open(f"{pwd}/data/{file_name}") as f:
return json.load(f)
class MappingSet:
mitre_tag_mapping = _load_file("mitre_tag_mapping.json")
sae_tag_mapping = _load_file("sae_filter_mapping.json")
operation_mapping = _load_file("operation_mapping.json")
meta_key_mapping = _load_file("meta_key_mapping.json")
file_true_type_mapping = _load_file("file_true_type_mapping.json")
file_sub_true_type_mapping = _load_file("file_sub_true_type_mapping.json")
def _append_value_into_json_field(json_data, filed, value):
if not filed in json_data:
json_data[filed] = []
if value and not value in json_data[filed]:
json_data[filed].append(value)
def process_workbench_impact_scope(json_data):
for impact_scope in json_data["impactScope"]:
entity_type = impact_scope["entityType"]
if entity_type == "account":
account_value = impact_scope["entityValue"].split("\\")
if len(account_value) == 2:
account = account_value[1]
_append_value_into_json_field(
json_data, "UserAccountNTDomain", account_value[0]
)
else:
account = account_value[0]
_append_value_into_json_field(json_data, "UserAccountName", account)
elif entity_type == "host":
_append_value_into_json_field(
json_data, "HostHostName", impact_scope["entityValue"]["name"]
)
return json_data
def process_workbench_indicators(json_data):
for indicator in json_data["indicators"]:
object_type = indicator["objectType"]
if object_type in XDR_INDICATORS_COLUMN_NAME.keys():
_append_value_into_json_field(
json_data,
XDR_INDICATORS_COLUMN_NAME[object_type],
indicator["objectValue"],
)
return json_data
def customize_workbench_json(clp_id, workbench_detail, workbench_record):
xdr_log = {}
for column in ALLOWED_WORKBENCH_COLUMN:
xdr_log[column] = workbench_record[column] if column in workbench_record else ""
for column in ALLOWED_WORKBENCH_DETAIL_COLUMN:
xdr_log[column] = workbench_detail[column] if column in workbench_detail else ""
xdr_log["xdrCustomerID"] = clp_id
xdr_log["impactScope_Summary"] = json.dumps(workbench_detail["impactScope"])
xdr_log = process_workbench_impact_scope(xdr_log)
xdr_log = process_workbench_indicators(xdr_log)
process_json_oversize(xdr_log, WB_OVERSIZED_FIELDS)
return xdr_log
def transform_rca_task(xdr_customer_id, workbench_id, data):
result = {"xdrCustomerID": xdr_customer_id, "workbenchId": workbench_id}
result.update(data)
return result
def transform_rca_result(target_info, data):
result = []
link_array = data["chain"]["links"]
(parent_set, event_set) = _convert_link(link_array)
object_array = data["chain"]["objects"]
obj_mapping = {
"objectHashId": "objectHashId",
"eventId": "eventId",
"objectName": "name",
"isMatched": "isMatched",
}
for obj_item in object_array:
# append workbench info
result_item = target_info.copy()
# process object info
for mapping_key in obj_mapping.keys():
result_item[mapping_key] = obj_item[obj_mapping[mapping_key]]
# process parent id
obj_id = result_item["objectHashId"]
result_item["parentObjectId"] = (
parent_set[obj_id] if obj_id in parent_set else None
)
# process object meta
result_item["objectMeta"] = _convert_meta(obj_item["meta"])
# process object event
if obj_id in event_set:
result_item["objectEvent"] = event_set[obj_id]
process_json_oversize(result_item, RCA_RESULT_OVERSIZED_FIELDS)
result.append(result_item)
return result
def extract_allowed_oat_fields(oat_log: dict):
filtered_oat_log = {"detail": {}}
for field in ALLOWED_OAT_FIELDS:
if field in oat_log:
filtered_oat_log[field] = oat_log[field]
for field in ALLOWED_OAT_DETAIL_FIELDS:
if field in oat_log.get("detail", {}):
filtered_oat_log["detail"][field] = oat_log["detail"][field]
return filtered_oat_log
def translate_oat_fields(oat_log: dict):
_META_TRANSFORM_FIELDS = {
"tags": _convert_tag,
"processTrueType": _convert_true_type_int,
}
for field, convert_func in _META_TRANSFORM_FIELDS.items():
if field in oat_log:
oat_log[field] = convert_func(oat_log[field])
def transform_oat_log(clp_id, log):
log = extract_allowed_oat_fields(log)
translate_oat_fields(log)
log["xdrCustomerId"] = clp_id
process_json_oversize(log, OAT_OVERSIZED_FIELDS)
return log
def _convert_true_type_int(type):
key = str(type)
if key in MappingSet.file_true_type_mapping:
return MappingSet.file_true_type_mapping[key]
else:
logger.error(f"Key {type} not in FILE_TRUE_TYPE_MAPPING.")
return type
def _convert_true_type(params):
key = str(params[0])
if key in MappingSet.file_true_type_mapping:
return MappingSet.file_true_type_mapping[key]
else:
logger.error(f"Key {params[0]} not in FILE_TRUE_TYPE_MAPPING.")
return params[0]
def _convert_sub_true_type(params):
sub_true_type = str(params[0])
true_type = str(params[1])
if true_type in MappingSet.file_true_type_mapping:
if true_type in MappingSet.file_sub_true_type_mapping:
if sub_true_type in MappingSet.file_sub_true_type_mapping[true_type]:
return MappingSet.file_sub_true_type_mapping[true_type][sub_true_type]
logger.error(
f"Connot find sub true type mapping. true_type: {true_type}, sub_true_type: {sub_true_type}"
)
return sub_true_type
def _convert_int(params):
return (int)(params[0])
def _convert_link(link_array):
parent_set = {}
event_set = {}
for link_item in link_array:
link_item.pop("eventTime")
if "tags" in link_item:
link_item["tags"] = _convert_tag(link_item["tags"])
link_item["firstSeen"] = int(link_item["firstSeen"])
link_item["lastSeen"] = int(link_item["lastSeen"])
# mapping operation
operation_key = str(link_item["operation"])
if operation_key in MappingSet.operation_mapping:
link_item["operation"] = MappingSet.operation_mapping[operation_key]
else:
logger.warning(f'operation: "{link_item["operation"]}" not in mapping set.')
src_obj = link_item["srcObj"]
tar_obj = link_item["tarObj"]
if src_obj in event_set:
event_set[src_obj].append(link_item.copy())
else:
event_set[src_obj] = [link_item.copy()]
if tar_obj in parent_set:
if parent_set[tar_obj] != src_obj:
logger.warning(
f'target: "{tar_obj}" different in parent_set: {parent_set[tar_obj]}.'
)
parent_set[tar_obj] = src_obj
return parent_set, event_set
def _convert_tag(tag_array):
result = []
if tag_array:
for tag in tag_array:
if tag.startswith("XSAE"):
tag_key = tag[tag.index(".") + 1 :]
value = (
MappingSet.sae_tag_mapping[tag_key]["name"]
if tag_key in MappingSet.sae_tag_mapping
else tag
)
elif tag.startswith("MITRE"):
tag_key = tag[tag.index(".") + 1 :]
value = (
MappingSet.mitre_tag_mapping[tag_key]["name"]
if tag_key in MappingSet.mitre_tag_mapping
else tag
)
else:
logger.warning(f"Tag {tag} not in Mapping set.")
value = tag
result.append({"name": tag, "value": value})
return result
def _convert_meta(meta_object):
result = []
_META_VALUE_FUNC = {
"objectFirstSeen": [_convert_int],
"objectLastSeen": [_convert_int],
"processLaunchTime": [_convert_int],
"processTrueType": [_convert_true_type],
"processSubTrueType": [
_convert_sub_true_type,
"110",
],
}
for key in meta_object.keys():
if key in MappingSet.meta_key_mapping:
name = MappingSet.meta_key_mapping[key]
else:
# Unknown field name
logger.warning(f'rca meta key: "{key}" not in mapping set.')
name = key
if name in _META_VALUE_FUNC:
func_value_list = _META_VALUE_FUNC[name]
params = [meta_object[key]]
exec_func = None
for item in func_value_list:
if callable(item):
exec_func = item
else:
params.append(meta_object[item])
if exec_func:
value = exec_func(params)
else:
logger.error(f"Do not have function in _META_VALUE_FUNC: {name}")
else:
value = meta_object[key]
result.append({"name": name, "value": value})
return result
def _trim_oversized_json(json_data: dict, field_name: str, field_data: Any) -> bool:
"""Check if the field data in (list, str) type is over the limit size.
if field_data is list, check if the total size of the list is over the limit.
if field_data is string, check if the size of the string is over the limit.
Args:
json_data (dict): json data
field_name (str): field name
field_data (Any): field data
Returns:
bool: if the field data is over the limit size
"""
is_oversized = False
if (
isinstance(field_data, list)
and len(orjson.dumps(field_data)) > FIELD_BYTES_LIMIT
):
field_size = 0
data_count = 0
is_oversized = True
for item in field_data:
# item len and json sign: ","
field_size += len(orjson.dumps(item)) + 1
if field_size > FIELD_BYTES_LIMIT:
logger.warning(
f"[process_field_over_size] {field_name} over size. "
f"data_count last: {data_count}. "
f"total_data_count: {len(field_data)}"
)
del field_data[data_count:]
break
data_count += 1
elif (
isinstance(field_data, str)
and len(bytes_string := field_data.encode()) > FIELD_BYTES_LIMIT
):
is_oversized = True
logger.warning(
f"[process_field_over_size] {field_name} over size. "
f"field_size: {len(bytes_string)}."
)
field_data = bytes_string[:FIELD_BYTES_LIMIT].decode(errors="ignore")
inner_field_keys = field_name.split(".")
if len(inner_field_keys) == 1:
json_data[inner_field_keys[0]] = field_data
elif len(inner_field_keys) == 2:
json_data[inner_field_keys[0]][inner_field_keys[1]] = field_data
else:
logger.error(f"[process_field_over_size] {field_name} depth is not supported.")
return is_oversized
def process_json_oversize(
json_data: dict, oversized_fields: Optional[List[List[str]]] = None
):
"""Do trimming if the field data in (list, str) type is over the limit size.
If no oversized_fields provided, do nothing.
Args:
json_data (_type_): json data
"""
if not oversized_fields:
return
for inner_fields in oversized_fields:
field_data = json_data
for field in inner_fields:
field_data = field_data.get(field, {})
is_oversized = False
if field_data:
is_oversized = _trim_oversized_json(json_data, ".".join(inner_fields), field_data)
if is_oversized:
json_data[".".join(inner_fields) + "_over_size"] = is_oversized